package com.offcn.bigdata.streaming.p1

import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream}
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.{SparkConf, SparkContext}

/**
  * SparkStreaming的入门案例
  *     入口：
  *         StreamingContext
  *     需求：读取socket数据，进行统计
  *  通过netcat来模拟socket数据源
  *  安装netcat：
  *     rpm -ivh nc-1.84-24.el6.x86_64.rpm
  *     yum -y install nc
  *
  *  编程模型：DStream
  *     DStream和SparkStreaming的关系，就相当于RDD和SparkCore或者DataFrame和SparkSQL
  *     DStream离散流，
  *         可以将DStream理解为List[RDD]，是一个RDD的序列
  *     DStream.map
  *     List[RDD].map(rdd => rdd.map()):List[RDD]-->DStream
  */
object _01StreamingWordCountApp {
    def main(args: Array[String]): Unit = {
        if(args == null || args.length != 2) {
            println(
                """
                  |Usage: <host> <port>
                """.stripMargin)
            System.exit(-1)
        }
        val Array(host, port) = args

        val conf = new SparkConf()
                    .setAppName("_01StreamingWordCountApp")
                    .setMaster("local[*]")
        val batchInterval = Seconds(2)
        val ssc = new StreamingContext(conf, batchInterval)
        val input: ReceiverInputDStream[String] = ssc.socketTextStream(host, port.toInt, storageLevel = StorageLevel.MEMORY_AND_DISK_SER_2)
        val words: DStream[String] = input.flatMap(line => line.split("\\s+"))
        val pairs: DStream[(String, Int)] = words.map(word => (word, 1))
        val ret: DStream[(String, Int)]  = pairs.reduceByKey(_+_)

        ret.print()

        ssc.start
        ssc.awaitTermination()
    }
}
